package com.lagou.rpc.consumer;

import com.lagou.rpc.consumer.client.RpcClient;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * @Desc
 * @Author Matures
 * @CreateTime 2021/4/12 22:45
 **/
@SpringBootApplication
public class ClientBootStrapApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(ClientBootStrapApplication.class,args);
    }


    public static Map<String, RpcClient> maps = new HashMap<>();

    public static final String path = "/lg-rpc";

    private static List<String> children = new ArrayList<>();

    public  static ZkClient zkClient;


    @Override
    public void run(String... args) throws Exception {
        // 从zookeeper中获取所有服务端节点信息
        zkClient = new ZkClient("127.0.0.1:2181");
        // 当前所有的子节点列表
        children = zkClient.getChildren(path);
        for (String child : children) {
            Object data = zkClient.readData(path+"/"+child);
            createConnect(child,data);
        }
        // 注册监听事件
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            @Override
            public void handleChildChange(String s, List<String> list) throws Exception {
                // list 为发生变化后的所有子节点列表
                // 当前没有的节点，现在有了就创建新的连接，当前有的现在没有了，则关闭连接
                for (String node : list) {
                    // 原来没有，新建连接
                    if(!children.contains(node)){
                        Object data = zkClient.readData(path+"/"+node);
                        createConnect(node,data);
                        children.add(node);
                    }
                }

                //原来有现在没有
                for (String child : children) {
                    // 关闭连接
                    if(!list.contains(child)){
                        closeConnect(child);
                        children.remove(child);
                    }
                }
            }
        });

        updateNodeValue();
        Thread.sleep(Integer.MAX_VALUE);

    }


    public static void createConnect(String path,Object data){
        String port = data.toString().split(":")[1];
        RpcClient rpcClient = new RpcClient("127.0.0.1", Integer.parseInt(port));
        maps.put(path,rpcClient);

        System.out.println("客户端与："+path+"的连接已建立");
    }

    public static void closeConnect(String path){
        RpcClient rpcClient = maps.get(path);
        rpcClient.close();
        maps.remove(path);
        System.out.println("客户端与："+path+"的连接已关闭");
    }

    private static ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);


    // 每隔五秒发送请求修改消耗的时间
    public void updateNodeValue(){
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                /**
                 * 向所有存在的节点修改消耗时间值
                 */
                for (String child : children) {
                    String string = zkClient.readData(path + "/" + child).toString();
                    String[] split = string.split(":");
                    int i = new Random().nextInt(5);
                    zkClient.writeData(path+"/"+child,split[0]+":"+split[1]+":"+i);
                    System.out.println(path+"/"+child+"节点的值修改成功："+i);
                }
            }
        }, 5, 5, TimeUnit.SECONDS);

    }

}
